msg_tool\scripts\kirikiri\archive\xp3/
segmenter.rs

1use super::reader::Reader;
2use anyhow::Result;
3use fastcdc::v2020::StreamCDC;
4use serde::Deserialize;
5use std::collections::HashMap;
6use std::io::Read;
7use std::sync::Arc;
8
9#[derive(Clone, Debug, Deserialize)]
10pub struct Segments {
11    segments: Arc<HashMap<String, Vec<u64>>>,
12    #[serde(default)]
13    default_config: Box<SegmenterConfig>,
14}
15
16#[derive(Clone, Debug, Deserialize)]
17#[serde(tag = "@type")]
18/// Configuration options for the segmenter.
19pub enum SegmenterConfig {
20    /// Do not segment the data.
21    None,
22    /// Use the FastCDC algorithm with specified minimum, average, and maximum chunk sizes.
23    FastCdc {
24        min_size: u32,
25        avg_size: u32,
26        max_size: u32,
27    },
28    /// Use fixed-size segments.
29    Fixed(usize),
30    Custom(Segments),
31}
32
33impl Default for SegmenterConfig {
34    fn default() -> Self {
35        SegmenterConfig::FastCdc {
36            min_size: 32 * 1024,
37            avg_size: 256 * 1024,
38            max_size: 8 * 1024 * 1024,
39        }
40    }
41}
42
43impl SegmenterConfig {
44    pub fn is_none(&self) -> bool {
45        matches!(self, SegmenterConfig::None)
46    }
47}
48
49/// A trait for strategies that split a byte slice into one or more segments.
50pub trait Segmenter {
51    fn segment<'a>(
52        &'a self,
53        data: &'a mut Reader,
54        filename: &'a str,
55    ) -> Box<dyn Iterator<Item = Result<Vec<u8>>> + 'a>;
56}
57
58pub struct FastCdcSegmenter {
59    min_size: u32,
60    avg_size: u32,
61    max_size: u32,
62}
63
64impl Segmenter for FastCdcSegmenter {
65    fn segment<'a>(
66        &'a self,
67        data: &'a mut Reader,
68        _filename: &'a str,
69    ) -> Box<dyn Iterator<Item = Result<Vec<u8>>> + 'a> {
70        let cdc = StreamCDC::new(
71            data,
72            self.min_size as usize,
73            self.avg_size as usize,
74            self.max_size as usize,
75        );
76        Box::new(cdc.map(|chunk| Ok(chunk?.data)))
77    }
78}
79
80pub struct FixedSizeSegmenter {
81    size: usize,
82}
83
84impl Segmenter for FixedSizeSegmenter {
85    fn segment<'a>(
86        &'a self,
87        data: &'a mut Reader,
88        _filename: &'a str,
89    ) -> Box<dyn Iterator<Item = Result<Vec<u8>>> + 'a> {
90        let size = self.size;
91        let mut buf = vec![0; size];
92        Box::new(std::iter::from_fn(move || {
93            let nbuf = &mut buf;
94            let mut total_read = 0;
95            while total_read < size {
96                match data.read(&mut nbuf[total_read..]) {
97                    Ok(0) => break, // EOF
98                    Ok(n) => total_read += n,
99                    Err(e) => return Some(Err(e.into())),
100                }
101            }
102            if total_read == 0 {
103                None // No more data to read
104            } else {
105                Some(Ok(buf[..total_read].to_vec()))
106            }
107        }))
108    }
109}
110
111pub struct CustomSegmenter {
112    segments: Arc<HashMap<String, Vec<u64>>>,
113    inner: Box<dyn Segmenter + Send + Sync>,
114}
115
116impl Segmenter for CustomSegmenter {
117    fn segment<'a>(
118        &'a self,
119        data: &'a mut Reader,
120        filename: &'a str,
121    ) -> Box<dyn Iterator<Item = Result<Vec<u8>>> + 'a> {
122        if let Some(segment_offsets) = self.segments.get(filename) {
123            let mut current_seg_idx = 0;
124            let mut reached_eof = false;
125
126            Box::new(std::iter::from_fn(move || {
127                if reached_eof {
128                    return None;
129                }
130
131                // 获取当前 Reader 的绝对位置
132                let current_pos = data.total_readed();
133
134                if current_seg_idx < segment_offsets.len() {
135                    // 1. 处理预设的分割点
136                    let target_pos = segment_offsets[current_seg_idx];
137                    current_seg_idx += 1;
138
139                    if target_pos <= current_pos {
140                        // 如果分割点无效(小于当前位置),跳过或视作该段为空
141                        return Some(Ok(Vec::new()));
142                    }
143
144                    let to_read = (target_pos - current_pos) as usize;
145                    let mut buf = vec![0; to_read];
146                    let mut total_read = 0;
147
148                    while total_read < to_read {
149                        match data.read(&mut buf[total_read..]) {
150                            Ok(0) => {
151                                reached_eof = true;
152                                break;
153                            }
154                            Ok(n) => total_read += n,
155                            Err(e) => return Some(Err(e.into())),
156                        }
157                    }
158
159                    if total_read == 0 && reached_eof {
160                        None
161                    } else {
162                        buf.truncate(total_read);
163                        Some(Ok(buf))
164                    }
165                } else {
166                    // 2. 处理“最后一个分割点之后”的剩余数据 (Tail)
167                    // 标记为已到达末尾,保证下一次调用返回 None
168                    reached_eof = true;
169
170                    let mut final_buf = Vec::new();
171                    let mut temp_buf = [0u8; 8192]; // 临时缓冲区用于读取剩余所有内容
172
173                    loop {
174                        match data.read(&mut temp_buf) {
175                            Ok(0) => break,
176                            Ok(n) => final_buf.extend_from_slice(&temp_buf[..n]),
177                            Err(e) => return Some(Err(e.into())),
178                        }
179                    }
180
181                    if final_buf.is_empty() {
182                        None
183                    } else {
184                        Some(Ok(final_buf))
185                    }
186                }
187            }))
188        } else {
189            self.inner.segment(data, filename)
190        }
191    }
192}
193
194pub fn create_segmenter(config: &SegmenterConfig) -> Option<Box<dyn Segmenter + Send + Sync>> {
195    match config {
196        SegmenterConfig::None => None,
197        SegmenterConfig::FastCdc {
198            min_size,
199            avg_size,
200            max_size,
201        } => Some(Box::new(FastCdcSegmenter {
202            min_size: *min_size,
203            avg_size: *avg_size,
204            max_size: *max_size,
205        })),
206        SegmenterConfig::Fixed(size) => Some(Box::new(FixedSizeSegmenter { size: *size })),
207        SegmenterConfig::Custom(manifest) => Some(Box::new(CustomSegmenter {
208            segments: manifest.segments.clone(),
209            inner: match create_segmenter(&manifest.default_config) {
210                Some(cfg) => cfg,
211                None => {
212                    return None;
213                }
214            },
215        })),
216    }
217}